-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-19960: Close pending tasks on shutdown. #21365
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Added logic to close pending tasks to init. Made standby task closure similar to the one for active tasks. Added a separate method for getting standby tasks from task registry. Added an integration test that reproduces the issue.
mjsax
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a first pass.
Can you update the PR description adding context on what the bug exactly is, and when we hit it? It seems to be related to not closing "pending tasks", but might be good to give some more context.
.../src/test/java/org/apache/kafka/streams/integration/RebalanceTaskClosureIntegrationTest.java
Outdated
Show resolved
Hide resolved
.../src/test/java/org/apache/kafka/streams/integration/RebalanceTaskClosureIntegrationTest.java
Outdated
Show resolved
Hide resolved
.../src/test/java/org/apache/kafka/streams/integration/RebalanceTaskClosureIntegrationTest.java
Outdated
Show resolved
Hide resolved
.../src/test/java/org/apache/kafka/streams/integration/RebalanceTaskClosureIntegrationTest.java
Show resolved
Hide resolved
.../src/test/java/org/apache/kafka/streams/integration/RebalanceTaskClosureIntegrationTest.java
Show resolved
Hide resolved
.../src/test/java/org/apache/kafka/streams/integration/RebalanceTaskClosureIntegrationTest.java
Show resolved
Hide resolved
...ntegration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsWrapper.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
Show resolved
Hide resolved
…ms/integration/RebalanceTaskClosureIntegrationTest.java Co-authored-by: Matthias J. Sax <mjsax@apache.org>
Minor refactoring.
mjsax
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more follow ups -- also need to get input from @lucasbru...
| } else { | ||
| standbyTasks.add(pendingTask); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the PR description. It say "shutdown during rebalance when active task become standby tasks" but seems it goes either way, and the PR is actually fixing both direction?
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
Show resolved
Hide resolved
lucasbru
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good find, thanks for identifying the problem.
An alternative way to fix it would be, I think, to consider pendingTasksToInit as a subset of Tasks.allTasks. Then conceptually, it may be simpler to say "close all tasks". But we'd have to check carefully what other places Tasks.allTasks and related methods are used, and if we are changing one of those places. In that case, we'd still need the "remove" fix you did.
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
Show resolved
Hide resolved
On second thought, that would probably create more problems than it solves. But I wonder if we should rename "allTasks" to "allInitializedTasks" then? |
the problem with drain is that we always try to remove the task we are closing from the task registry: https://github.com/Nikita-Shupletsov/kafka/blob/8ee99ee82df2cf89fbb769d26c66395fd3a63761/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L1405 if we drain the pending tasks, there will be no way for us to confirm it ever existed there. so we either need to relax the check in the remove method or have a separate branch for closing pending to init tasks |
Yes, that is what I mean by this -- after we drain the pending-tasks-to-init, we would add them into some new "pending-task-to-close" member inside It's to some extend "cosmetics" as we only move all "pending task" from one collection into a different one, but it preserves the invariant we have, and makes the code cleaner. |
|
No problem, it's a minor thing. |
mjsax
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few follow up comments. -- Still working through the integration test, but might be good to share what I have right away.
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
Outdated
Show resolved
Hide resolved
.../src/test/java/org/apache/kafka/streams/integration/RebalanceTaskClosureIntegrationTest.java
Outdated
Show resolved
Hide resolved
Co-authored-by: Matthias J. Sax <mjsax@apache.org>
Co-authored-by: Matthias J. Sax <mjsax@apache.org>
Co-authored-by: Matthias J. Sax <mjsax@apache.org>
Modified the test so it actually reproduces the issue. Small refactoring.
.../src/test/java/org/apache/kafka/streams/integration/RebalanceTaskClosureIntegrationTest.java
Outdated
Show resolved
Hide resolved
.../src/test/java/org/apache/kafka/streams/integration/RebalanceTaskClosureIntegrationTest.java
Outdated
Show resolved
Hide resolved
.../src/test/java/org/apache/kafka/streams/integration/RebalanceTaskClosureIntegrationTest.java
Show resolved
Hide resolved
.../src/test/java/org/apache/kafka/streams/integration/RebalanceTaskClosureIntegrationTest.java
Show resolved
Hide resolved
.../src/test/java/org/apache/kafka/streams/integration/RebalanceTaskClosureIntegrationTest.java
Outdated
Show resolved
Hide resolved
.../src/test/java/org/apache/kafka/streams/integration/RebalanceTaskClosureIntegrationTest.java
Show resolved
Hide resolved
.../src/test/java/org/apache/kafka/streams/integration/RebalanceTaskClosureIntegrationTest.java
Outdated
Show resolved
Hide resolved
Co-authored-by: Matthias J. Sax <mjsax@apache.org>
Co-authored-by: Matthias J. Sax <mjsax@apache.org>
…ms/integration/RebalanceTaskClosureIntegrationTest.java Co-authored-by: Matthias J. Sax <mjsax@apache.org>
…ms/integration/RebalanceTaskClosureIntegrationTest.java Co-authored-by: Matthias J. Sax <mjsax@apache.org>
…ms/integration/RebalanceTaskClosureIntegrationTest.java Co-authored-by: Matthias J. Sax <mjsax@apache.org>
…ms/integration/RebalanceTaskClosureIntegrationTest.java Co-authored-by: Matthias J. Sax <mjsax@apache.org>
…ms/integration/RebalanceTaskClosureIntegrationTest.java Co-authored-by: Matthias J. Sax <mjsax@apache.org>
|
Thanks for the fix. -- Merged to Cherry-picking to older branches is not clean. Can you maybe look into it and open new PRs all the way back to |
This PR fixes a bug when KS doesn't close stores if the shutdown was triggered during rebalance where an active tasks gets converted to a standby one and put into pendingTasksToInit * Added logic to close pending tasks to init. * Made standby task closure similar to the one for active tasks. * Added a separate method for getting standby tasks from task registry. * Added an integration test that reproduces the issue. Reviewers: Matthias J. Sax <matthias@confluent.io> --------- Co-authored-by: Matthias J. Sax <mjsax@apache.org>
This PR fixes a bug when KS doesn't close stores if the shutdown was triggered during rebalance where an active tasks gets converted to a standby one and put into pendingTasksToInit * Added logic to close pending tasks to init. * Made standby task closure similar to the one for active tasks. * Added a separate method for getting standby tasks from task registry. * Added an integration test that reproduces the issue. Reviewers: Matthias J. Sax <matthias@confluent.io> --------- Co-authored-by: Matthias J. Sax <mjsax@apache.org> Conflicts: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
This PR fixes a bug when KS doesn't close stores if the shutdown was triggered during rebalance where an active tasks gets converted to a standby one and put into pendingTasksToInit * Added logic to close pending tasks to init. * Made standby task closure similar to the one for active tasks. * Added a separate method for getting standby tasks from task registry. * Added an integration test that reproduces the issue. Reviewers: Matthias J. Sax <matthias@confluent.io> --------- Co-authored-by: Matthias J. Sax <mjsax@apache.org> Conflicts: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
This PR fixes a bug when KS doesn't close stores if the shutdown was triggered during rebalance where an active tasks gets converted to a standby one and put into pendingTasksToInit * Added logic to close pending tasks to init. * Made standby task closure similar to the one for active tasks. * Added a separate method for getting standby tasks from task registry. * Added an integration test that reproduces the issue. Reviewers: Matthias J. Sax <matthias@confluent.io> --------- Co-authored-by: Matthias J. Sax <mjsax@apache.org> Conflicts: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
This PR fixes a bug when KS doesn't close stores if the shutdown was triggered during rebalance where an active tasks gets converted to a standby one and put into pendingTasksToInit * Added logic to close pending tasks to init. * Made standby task closure similar to the one for active tasks. * Added a separate method for getting standby tasks from task registry. * Added an integration test that reproduces the issue. Reviewers: Matthias J. Sax <matthias@confluent.io> --------- Co-authored-by: Matthias J. Sax <mjsax@apache.org> Conflicts: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
This PR fixes a bug when KS doesn't close stores if the shutdown was triggered during rebalance where an active tasks gets converted to a standby one and put into pendingTasksToInit * Added logic to close pending tasks to init. * Made standby task closure similar to the one for active tasks. * Added a separate method for getting standby tasks from task registry. * Added an integration test that reproduces the issue. Reviewers: Matthias J. Sax <matthias@confluent.io> Conflicts: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
This PR fixes a bug when KS doesn't close stores if the shutdown was
triggered during rebalance where an active tasks gets converted to a
standby one and put into pendingTasksToInit
Reviewers: Matthias J. Sax matthias@confluent.io